package etcd

import (
	"fmt"
	"strings"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	kuberuntime "k8s.io/apimachinery/pkg/runtime"
	clientset "k8s.io/client-go/kubernetes"
	clientsetscheme "k8s.io/client-go/kubernetes/scheme"

	operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
	"github.com/karmada-io/karmada/operator/pkg/constants"
	"github.com/karmada-io/karmada/operator/pkg/util"
	"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
)

// EnsureKarmadaEtcd creates etcd StatefulSet and service resource.
func EnsureKarmadaEtcd(client clientset.Interface, cfg *operatorv1alpha1.LocalEtcd, name, namespace string) error {
	if err := installKarmadaEtcd(client, name, namespace, cfg); err != nil {
		return err
	}
	return createEtcdService(client, name, namespace)
}

func installKarmadaEtcd(client clientset.Interface, name, namespace string, cfg *operatorv1alpha1.LocalEtcd) error {
	// if the number of etcd is greater than one, we need to concatenate the peerURL for each member cluster.
	// memberName is podName generated by etcd statefuleset: ${statefulsetName}-index
	// memberPeerURL uses the etcd peer headless service name: ${podName}.${serviceName}.${namespace}.svc.cluster.local:2380
	initialClusters := make([]string, *cfg.Replicas)
	for index := range initialClusters {
		memberName := fmt.Sprintf("%s-%d", util.KarmadaEtcdName(name), index)

		// build etcd member cluster peer url
		memberPeerURL := fmt.Sprintf("http://%s.%s.%s.svc.cluster.local:%v",
			memberName,
			util.KarmadaEtcdName(name),
			namespace,
			constants.EtcdListenPeerPort,
		)

		initialClusters[index] = fmt.Sprintf("%s=%s", memberName, memberPeerURL)
	}

	etcdStatefuleSetBytes, err := util.ParseTemplate(KarmadaEtcdStatefulSet, struct {
		StatefulSetName, Namespace, Image                  string
		EtcdClientService, CertsSecretName                 string
		EtcdPeerServiceName, InitialCluster                string
		Replicas, EtcdListenClientPort, EtcdListenPeerPort int32
	}{
		StatefulSetName:      util.KarmadaEtcdName(name),
		Namespace:            namespace,
		Image:                cfg.Image.Name(),
		EtcdClientService:    util.KarmadaEtcdClientName(name),
		CertsSecretName:      util.EtcdCertSecretName(name),
		EtcdPeerServiceName:  util.KarmadaEtcdName(name),
		InitialCluster:       strings.Join(initialClusters, ","),
		Replicas:             *cfg.Replicas,
		EtcdListenClientPort: constants.EtcdListenClientPort,
		EtcdListenPeerPort:   constants.EtcdListenPeerPort,
	})
	if err != nil {
		return fmt.Errorf("error when parsing Etcd statefuelset template: %w", err)
	}

	etcdStatefulSet := &appsv1.StatefulSet{}
	if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdStatefuleSetBytes, etcdStatefulSet); err != nil {
		return fmt.Errorf("error when decoding Etcd StatefulSet: %w", err)
	}

	if err := apiclient.CreateOrUpdateStatefulSet(client, etcdStatefulSet); err != nil {
		return fmt.Errorf("error when creating Etcd statefulset, err: %w", err)
	}

	return nil
}

func createEtcdService(client clientset.Interface, name, namespace string) error {
	etcdServicePeerBytes, err := util.ParseTemplate(KarmadaEtcdPeerService, struct {
		ServiceName, Namespace                   string
		EtcdListenClientPort, EtcdListenPeerPort int32
	}{
		ServiceName:          util.KarmadaEtcdName(name),
		Namespace:            namespace,
		EtcdListenClientPort: constants.EtcdListenClientPort,
		EtcdListenPeerPort:   constants.EtcdListenPeerPort,
	})
	if err != nil {
		return fmt.Errorf("error when parsing Etcd client serive template: %w", err)
	}

	etcdPeerService := &corev1.Service{}
	if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdServicePeerBytes, etcdPeerService); err != nil {
		return fmt.Errorf("error when decoding Etcd client service: %w", err)
	}

	if err := apiclient.CreateOrUpdateService(client, etcdPeerService); err != nil {
		return fmt.Errorf("error when creating etcd client service, err: %w", err)
	}

	etcdClientServiceBytes, err := util.ParseTemplate(KarmadaEtcdClientService, struct {
		ServiceName, Namespace string
		EtcdListenClientPort   int32
	}{
		ServiceName:          util.KarmadaEtcdClientName(name),
		Namespace:            namespace,
		EtcdListenClientPort: constants.EtcdListenClientPort,
	})
	if err != nil {
		return fmt.Errorf("error when parsing Etcd client serive template: %w", err)
	}

	etcdClientService := &corev1.Service{}
	if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdClientServiceBytes, etcdClientService); err != nil {
		return fmt.Errorf("err when decoding Etcd client service: %w", err)
	}

	if err := apiclient.CreateOrUpdateService(client, etcdClientService); err != nil {
		return fmt.Errorf("err when creating etcd client service, err: %w", err)
	}

	return nil
}
